Skip to content

消息队列_RabbitMQ_基础使用

交换机类型

(交换机 → 队列)

  • Direct Exchange
    • binding key 与消息的 routing key 完全匹配队列
  • Topic Exchange
    • 模式匹配
  • Fanout Exchange
  • 广播(忽略 routing key
  • Headers Exchange
    • 不依赖 routing key,头部属性匹配

RabbitMQ广播和直接模式示例

项目参考: https://gitee.com/yidao620/springboot-bucket

这个项目的最后更新时间是五年前, 这里建议单独打开一下 springboot-rabbitmq

本地的 maven 版本是 3.5.2 , springboot-rabbitmq 依赖中的 maven-compiler-plugin 改为了 3.6.1 后没有报错

在 SpringBoot 中,使用消息队列需要引入 amqp 的依赖

<dependency>  
   <groupId>org.springframework.boot</groupId>  
   <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

yml 配置内容

spring:  
  profiles: dev  
  rabbitmq:  
    host: 127.0.0.1  
    port: 5672  
    username: guest  
    password: guest

配置类

在这个项目中,定义了一个配置类 RabbitConfig

package com.xncoding.pos.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * RabbitConfig
 *
 * @version 1.0
 * @since 2018/3/1
 */
@Configuration
public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 定制化 AMQP 模版。
     * 设置消息转换器、编码、消息确认和返回回调。
     *
     * @return the amqp template
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);

        // 使用 Jackson 作为消息转换器,自动将消息转换为 JSON 格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        // 设置字符编码为 UTF-8
        rabbitTemplate.setEncoding("UTF-8");

        // 消息发送失败时返回到队列,需要在配置文件中设置 publisher-returns: true
        rabbitTemplate.setMandatory(true);

        // 定义消息发送失败的回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });

        // 定义消息发送到交换机确认回调,需要在配置文件中设置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });

        return rabbitTemplate;
    }

    // Direct Exchange 配置部分

    /**
     * 声明 Direct 交换机。
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        // 创建并返回一个持久化的 Direct 交换机
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 声明队列。
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        // 创建并返回一个持久化的队列
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }

    /**
     * 将队列绑定到交换机。
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue,
                                 @Qualifier("directExchange") Exchange exchange) {
        // 将队列绑定到交换机,使用路由键 "DIRECT_ROUTING_KEY"
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }

    // Fanout Exchange 配置部分

    /**
     * 声明 Fanout 交换机。
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        // 创建并返回一个持久化的 Fanout 交换机
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }

    /**
     * 声明队列 A。
     *
     * @return the queue
     */
    @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        // 创建并返回一个持久化的队列 A
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }

    /**
     * 声明队列 B。
     *
     * @return the queue
     */
    @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        // 创建并返回一个持久化的队列 B
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }

    /**
     * 将队列 A 绑定到 Fanout 交换机。
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue,
                            @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        // 将队列 A 绑定到 Fanout 交换机
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    /**
     * 将队列 B 绑定到 Fanout 交换机。
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue,
                            @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        // 将队列 B 绑定到 Fanout 交换机
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

这个 RabbitConfig 配置类通过 @Bean 注解声明了多个 Spring Bean,用于设置 RabbitMQ 的不同组件。

它涵盖了消息模板的定制、交换机和队列的声明,以及队列与交换机的绑定。

这样的配置提供了灵活的方式来定义消息传递的行为,包括消息格式、路由策略和队列管理,适用于不同的消息处理需求。

通过这种方式,可以在 Spring 应用中轻松地使用 RabbitMQ 进行高效的消息通信。


通过广播方式发送消息

/**
 * 发送广播模式的消息。
 * 
 * @param p 要发送的消息内容
 */
public void broadcast(String p) {
    // 创建 CorrelationData 对象,带有一个唯一的标识符。
    // 这个标识符用于消息确认过程中识别消息。
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

    // 使用 RabbitTemplate 将消息发送到指定的交换机。
    // 参数1: 交换机名称 - 这里是 "FANOUT_EXCHANGE"。
    // 参数2: 路由键 - 在 Fanout 交换机中,路由键会被忽略,所以这里传递一个空字符串。
    // 参数3: 消息内容 - 这里是传入的参数 p。
    // 参数4: CorrelationData - 包含消息的唯一标识符,用于消息跟踪和确认。
    rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
}

在前面的配置类中,是声明了 Fanout 交换机, 并且绑定两个队列在这个交换机上。

生产者类

package com.xncoding.pos.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 消息发送服务
 */
@Service
public class SenderService {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 测试广播模式.
     * 在广播模式下,消息会被发送到所有绑定到交换机的队列。
     *
     * @param p 要发送的消息内容
     */
    public void broadcast(String p) {
        // 为每个消息创建一个带唯一标识符的 CorrelationData 对象
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 将消息发送到 FANOUT_EXCHANGE 交换机,路由键为空字符串
        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
    }

    /**
     * 测试Direct模式.
     * 在Direct模式下,消息会被路由到具有指定路由键的队列。
     *
     * @param p 要发送的消息内容
     */
    public void direct(String p) {
        // 为每个消息创建一个带唯一标识符的 CorrelationData 对象
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 将消息发送到 DIRECT_EXCHANGE 交换机,并指定路由键
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
    }
}

消费者类

package com.xncoding.pos.mq;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 消息监听器
 */
@Component
public class Receiver {
    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    /**
     * FANOUT广播队列监听一.
     * 在 FANOUT_QUEUE_A 队列上监听消息。
     *
     * @param message 接收到的消息
     * @param channel 通信通道
     * @throws IOException 在消息确认过程中可能抛出的异常
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        // 确认消息已被正确接收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        // 记录接收到的消息
        log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
    }

    /**
     * FANOUT广播队列监听二.
     * 在 FANOUT_QUEUE_B 队列上监听消息。
     *
     * @param message 接收到的消息
     * @param channel 通信通道
     * @throws IOException 在消息确认过程中可能抛出的异常
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        // 确认消息已被正确接收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        // 记录接收到的消息
        log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
    }

    /**
     * DIRECT模式.
     * 在 DIRECT_QUEUE 队列上监听消息。
     *
     * @param message 接收到的消息
     * @param channel 通信通道
     * @throws IOException 在消息确认过程中可能抛出的异常
     */
    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        // 确认消息已被正确接收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        // 记录接收到的消息
        log.debug("DIRECT " + new String(message.getBody()));
    }
}
  • 在生产者类(SenderService)中,两个方法分别演示了如何在广播(Fanout)和直接(Direct)模式下发送消息。
  • 在消费者类(Receiver)中,每个方法都使用了 @RabbitListener 注解来监听特定的队列,并在接收到消息后执行相应的处理逻辑。

这里由于你配置了接收的监听,因此当你发送的时候你就收到了这条消息,并且通过日志的方式打印了出来,如果需要比较明显一点的观察现象,可以加一个等待时间确保被消费。

// 等待一段时间以确保消息被消费 
Thread.sleep(1000);

主题交换机(Topic Exchange)的使用示例

Topic Exchange 直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。

这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"* " 、 "#"。需要注意的是通配符前面必须要加上"."符号。

* 符号:有且只匹配一个词。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。 # 符号:匹配一个或多个词。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。

image.png

使用示例

在前文的基础上,新增一些内容

配置类(更新配置类以包含一个主题交换机和两个队列,以及它们的绑定)

@Configuration
public class RabbitConfig {

    // ... 其他配置 ...

    // Topic Exchange 配置
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("TOPIC_EXCHANGE");
    }

    @Bean
    public Queue topicQueueOneWord() {
        return new Queue("TOPIC_QUEUE_ONE_WORD");
    }

    @Bean
    public Queue topicQueueMultipleWords() {
        return new Queue("TOPIC_QUEUE_MULTIPLE_WORDS");
    }

    // 绑定键 "topic.*" 只匹配一个单词
    @Bean
    public Binding bindingTopicOneWord(Queue topicQueueOneWord, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueOneWord).to(topicExchange).with("topic.*");
    }

    // 绑定键 "topic.#" 匹配零个或多个单词
    @Bean
    public Binding bindingTopicMultipleWords(Queue topicQueueMultipleWords, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueueMultipleWords).to(topicExchange).with("topic.#");
    }
}

生产者类

@Service
public class SenderService {

    // ... 其他方法 ...

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到主题交换机
     */
    public void sendToTopic(String routingKey, String message) {
        rabbitTemplate.convertAndSend("TOPIC_EXCHANGE", routingKey, message);
    }
}

消费者类

@Component
public class Receiver {

    // ... 其他方法 ...

    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    @RabbitListener(queues = {"TOPIC_QUEUE_ONE_WORD"})
    public void receiveFromTopicOneWord(Message message) {
        log.debug("Received in TOPIC_QUEUE_ONE_WORD: " + new String(message.getBody()));
    }

    @RabbitListener(queues = {"TOPIC_QUEUE_MULTIPLE_WORDS"})
    public void receiveFromTopicMultipleWords(Message message) {
        log.debug("Received in TOPIC_QUEUE_MULTIPLE_WORDS: " + new String(message.getBody()));
    }
}

测试类(发送消息并验证它们被正确路由到对应的队列:)

@SpringBootTest
public class RabbitMqTest {

    // ... 其他测试 ...

    @Autowired
    private SenderService senderService;

    @Test
    public void testTopicExchange() throws InterruptedException {
        // 发送消息,路由键匹配 "topic.*",应该只被 TOPIC_QUEUE_ONE_WORD 接收
        senderService.sendToTopic("topic.one", "Message for topic.one");

        // 发送消息,路由键匹配 "topic.#",应该被两个队列接收
        senderService.sendToTopic("topic.one.two", "Message for topic.one.two");

        // 等待一段时间以确保消息被消费
        Thread.sleep(1000);
    }
}
  • 第一个测试消息使用路由键 "topic.one",它符合 * 的匹配规则(匹配一个单词),因此只有 "TOPIC_QUEUE_ONE_WORD" 队列接收到这个消息。
  • 第二个测试消息使用路由键 "topic.one.two",它符合 # 的匹配规则(匹配多个单词),因此两个队列都接收到这个消息。

运行测试后,通过日志输出可以验证这些匹配规则是否按预期工作。

小结

在 RabbitMQ中比较常用的三种模式是:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。

熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。

通配符(TopicExchange)这种模式也可以达到直连(DirectExchange)和发布订阅(FanoutExchange)这两种的效果。

FanoutExchange不需要绑定routingKey,所以性能相对TopicExchange会好一些。

总结

交换机类型对比

交换机类型路由依据支持路由键匹配方式场景举例
Direct Exchangerouting key完全匹配(binding key = routing key)点对点消息
Topic Exchangerouting key(通配符)模式匹配(*、# 通配符)分类订阅/分组
Fanout Exchange广播所有绑定队列群发/广播
Headers Exchangeheader属性根据消息头属性高级过滤场景

备注:

  • * 通配符:匹配单个词。
  • # 通配符:匹配零个或多个词。
  • Direct 适合精准投递,Topic 适合按主题分类,Fanout 适合全员广播,Headers 适合复杂消息属性过滤。

希望这个表格可以帮助你快速理解各种交换机类型的区别!